package com.jie.flink.cdc.service;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * @author zhanggj
 * @date 2023/5/25 18:21
 * @desc
 */
@Slf4j
@Data
public abstract class FlinkCdcService {

    private String jobName;
    private List<Sink<String>> sinkList;
    private Integer runOrder;
    private static final Long SLEEP_SECOND_MULTIPLE = 10L;

    public void run() {
        CompletableFuture.runAsync(() -> {
            try {
                log.info("start flink-stream:{} will sleep {} second", jobName, runOrder * SLEEP_SECOND_MULTIPLE);
                TimeUnit.SECONDS.sleep(runOrder * SLEEP_SECOND_MULTIPLE);
            } catch (InterruptedException e) {
                log.error("start flink-stream:{} sleep exception", jobName, e);
            }
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.disableOperatorChaining();
            env.enableCheckpointing(6000L);
            // 配置checkpoint 超时时间
            //env.getCheckpointConfig().setCheckpointTimeout(Duration.ofMinutes(1).toMillis());
            // 避免扫描快照超时
            env.getCheckpointConfig().setTolerableCheckpointFailureNumber(100);
            // 此配置和数据重放长度有关，当配置snapshot.mode未never（推荐）时，服务重启后会重放服务关闭前CheckpointInterval的数据
            env.getCheckpointConfig().setCheckpointInterval(Duration.ofSeconds(10).toMillis());
            //指定 CK 的一致性语义
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
            //设置任务关闭的时候保留最后一次 CK 数据
            env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            // 指定从 CK 自动重启策略
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 2000L));
            //设置状态后端
            env.setStateBackend(new HashMapStateBackend());

            final DataStreamSource<String> dataStream = buildFlinkStreamSource(env);
            this.sinkList.forEach(sink -> {
                dataStream.sinkTo(sink).name(getJobName().concat("_").concat(sink.getClass().getSimpleName()));
            });
            try {
                env.execute(getJobName());
            } catch (Exception e) {
                log.error("execute flink-cdc exception:", e);
            }
            log.info("任务{}，已经启动，开始执行", this.jobName);
        });

    }

    /**
     * 添加数据flink采集的源
     * @param env
     * @return
     */
    protected abstract DataStreamSource<String> buildFlinkStreamSource(final StreamExecutionEnvironment env);

}
